Dimensionality reduction works only if the inputs are correlated (like images from the same domain). It fails if we pass in completely random inputs each time we train an autoencoder. So in the end, an autoencoder can produce lower dimensional output (at the encoder) given an input much like Principal Component Analysis (PCA). And since we don’t have to use any labels during training, it’s an unsupervised model as well.
In [5]:
import os
from random import randint
from collections import Counter
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
import numpy as np
import tensorflow as tf
In [182]:
corpus = "the quick brown fox jumped over the lazy dog from the quick tall fox".split()
test_corpus = "the quick brown fox jumped over the lazy dog from the quick tall fox".split()
corpus[:10]
Out[182]:
In [183]:
def build_vocab(words, vocab_size):
""" Build vocabulary of VOCAB_SIZE most frequent words """
dictionary = dict()
count = [('UNK', -1)]
count.extend(Counter(words).most_common(vocab_size - 1))
index = 0
for word, _ in count:
dictionary[word] = index
index += 1
index_dictionary = dict(zip(dictionary.values(), dictionary.keys()))
return dictionary, index_dictionary
In [184]:
vocabulary, reverse_vocabulary = build_vocab(corpus, 100)
In [185]:
vocabulary
Out[185]:
In [186]:
def index_words_in_corpus(corpus):
return [vocabulary[token] if token in vocabulary else 0 for token in corpus]
In [187]:
corpus = index_words_in_corpus(corpus)
test_corpus = index_words_in_corpus(test_corpus)
In [188]:
test_corpus
Out[188]:
In [189]:
vocabulary_size = len(vocabulary)
vocabulary_size
Out[189]:
In [190]:
def one_hot_encode(index):
row = np.zeros(vocabulary_size, dtype=np.int32)
row[index] = 1
return row
In [191]:
data = np.array([one_hot_encode(i) for i in corpus])
test_data = np.array([one_hot_encode(i) for i in test_corpus])
In [192]:
print("(TRAIN: Total number of words, Vocabulary size):", data.shape)
print("(TEST: Total number of words, Vocabulary size):", test_data.shape)
In [193]:
data[randint(1, data.shape[0])]
Out[193]:
In [194]:
X = tf.placeholder(tf.float32, shape=(None, vocabulary_size))
Y = tf.placeholder(tf.float32, shape=(None, vocabulary_size))
In [196]:
w1 = tf.Variable(tf.random_normal(shape=(vocabulary_size, 1000), stddev=0.01), name='weights1')
b1 = tf.Variable(tf.zeros([1, 1000]), name="bias1")
layer1 = tf.nn.relu(tf.add(tf.matmul(X, w1), b1))
w2 = tf.Variable(tf.random_normal(shape=(1000, 250), stddev=0.01), name='weights2')
b2 = tf.Variable(tf.zeros([1, 250]), name="bias2")
layer2 = tf.nn.relu(tf.add(tf.matmul(layer1, w2), b2))
w = tf.Variable(tf.random_normal(shape=(250, 50), stddev=0.01), name='weights')
b = tf.Variable(tf.zeros([1, 50]), name="bias")
code = tf.nn.relu(tf.add(tf.matmul(layer2, w), b))
w3 = tf.Variable(tf.random_normal(shape=(50, 250), stddev=0.01), name='weights3')
b3 = tf.Variable(tf.zeros([1, 250]), name="bias3")
layer3 = tf.nn.relu(tf.add(tf.matmul(code, w3), b3))
w4 = tf.Variable(tf.random_normal(shape=(250, 1000), stddev=0.01), name='weights4')
b4 = tf.Variable(tf.zeros([1, 1000]), name="bias4")
layer4 = tf.nn.relu(tf.add(tf.matmul(layer3, w4), b4))
w5 = tf.Variable(tf.random_normal(shape=(1000, vocabulary_size), stddev=0.01), name='weights5')
b5 = tf.Variable(tf.zeros([1, vocabulary_size]), name="bias5")
decoder = tf.nn.sigmoid(tf.add(tf.matmul(layer4, w5), b5))
In [197]:
# entropy = tf.nn.softmax_cross_entropy_with_logits(logits=decoder, labels=Y)
loss = tf.reduce_mean(tf.pow(X - decoder, 2))
In [198]:
optimizer = tf.train.RMSPropOptimizer(learning_rate=LEARNING_RATE).minimize(loss)
init = tf.global_variables_initializer()
In [203]:
LEARNING_RATE = 0.01
NUM_TRAIN_STEPS = 1000
SKIP_STEP = 10 # how many steps to skip before reporting the loss
In [205]:
with tf.Session() as sess:
sess.run(init)
for i in range(NUM_TRAIN_STEPS):
_, loss_val = sess.run([optimizer, loss], feed_dict={X: data})
if i % SKIP_STEP == 0:
print("EPOCH {}/{}, LOSS {}".format(i , NUM_TRAIN_STEPS, loss_val))
test_data_compressed = sess.run(decoder, feed_dict={X: test_data})
# np.save(outfile, test_data_compressed)
In [206]:
test_data_compressed.shape
Out[206]:
In [207]:
test_data_compressed
Out[207]:
In [208]:
test_data_compressed[test_data_compressed>0] = 1
In [209]:
test_data_compressed
Out[209]:
In [210]:
test_data
Out[210]:
In [212]:
sent = np.ndarray.tolist(test_data_compressed)[0]
print(' '.join([reverse_vocabulary[i] if sent[i] == 1. else "" for i in range(len(sent))]))
In [ ]: